package com.nx.platform.es.biz.esspider.recycle;

import com.google.common.io.Files;
import com.nx.platform.es.biz.esspider.entity.Item;

import java.io.*;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;

/**
 * @author
 * @date 2018/04/13
 */
public class RecyclerImpl implements Recycler {

    private final ReadWriteLock lock;
    private final Bin failed;
    private final Bin temp;

    public RecyclerImpl(String dataPath) {
        lock = new ReentrantReadWriteLock();
        failed = new Bin(String.format("%s/data-translog.failed", dataPath));
        temp = new Bin(String.format("%s/data-translog.temp", dataPath));
    }

    @Override
    public void open() throws Exception {
        lock.readLock().lock();
        try {
            failed.readStarted();
            temp.readStarted();
        } finally {
            lock.readLock().unlock();
        }
    }

    @Override
    public void close() throws Exception {
        lock.readLock().lock();
        try {
            failed.close();
            temp.close();
        } finally {
            lock.readLock().unlock();
        }
    }

    @Override
    public void recycle(Map<Long, Item> items) {
        lock.readLock().lock();
        try {
            failed.write(items);
        } catch (IOException e) {
            LOGGER.error("recycle error", e);
        } finally {
            lock.readLock().unlock();
        }
    }

    @Override
    public void recycle(Consumer<String> consumer) {
        try {
            lock.writeLock().lock();
            try {
                failed.readTo(temp);
                failed.clear();
            } finally {
                lock.writeLock().unlock();
            }
            for (String line = temp.readLine(); line != null; line = temp.readLine()) {
                consumer.accept(line);
            }
            temp.clear();
        } catch (Exception e) {
            LOGGER.error("recycle faild", e);
        }
    }

    private static class Bin {

        private final String fileName;
        private BufferedReader in;
        private PrintWriter out;

        public Bin(String fileName) {
            this.fileName = fileName;
        }

        private synchronized void readStarted() throws IOException {
            if (in == null) {
                File file = new File(fileName);
                Files.createParentDirs(file);
                if (!file.exists()) {
                    if (!file.createNewFile()) {
                        LOGGER.error("file_create_fale");
                    }
                }
                in = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
            }
        }

        private synchronized void readCompleted() throws IOException {
            if (in != null) {
                in.close();
                in = null;
            }
        }

        private synchronized void writeStarted(boolean append) throws IOException {
            if (out == null) {
                File file = new File(fileName);
                Files.createParentDirs(file);
                if (!file.exists()) {
                    if( !file.createNewFile()){
                        LOGGER.error("file_create_fail");
                    }
                }
                out = new PrintWriter(new OutputStreamWriter(new FileOutputStream(file, append)));
            }
        }

        private synchronized void writeCompleted() {
            if (out != null) {
                out.flush();
                out.close();
                out = null;
            }
        }

        public synchronized String readLine() throws IOException {
            writeCompleted();
            readStarted();
            return in.readLine();
        }

        public synchronized void readTo(Bin other) throws IOException {
            writeCompleted();
            readCompleted();
            readStarted();
            other.readCompleted();
            other.writeStarted(true);
            for (String line = in.readLine(); line != null; line = in.readLine()) {
                other.out.println(line);
            }
            other.out.flush();
        }

        public synchronized void write(Map<Long, Item> items) throws IOException {
            if (items == null || items.isEmpty()) {
                return;
            }
            readCompleted();
            writeStarted(true);
            items.values().forEach((Item item) -> out.println(item.getType() + "," + item.getId()));
            out.flush();
        }

        public synchronized void clear() throws IOException {
            readCompleted();
            writeCompleted();
            writeStarted(false);
            out.flush();
            writeCompleted();
        }

        public synchronized void close() throws IOException {
            readCompleted();
            writeCompleted();
        }

    }

}
